Introduction to Column-Oriented Data Storage
Deep Dive into Parquet
Working with Arrow in R
Querying Parquet with Different Engines
Arrow Datasets for Larger-than-Memory Operations
Partitioning Strategies
Hands-on Workshop: Analysis with PUMS Data
Data has to be represented somewhere, both during analysis and when storing.
The shape and characteristics of this representation has a huge impact on performance.
What if you could speed up a key part of your analysis by 30x and reduce your storage by 10x?
Row-oriented
|ID|Name |Age|City |
|--|-----|---|--------|
|1 |Alice|25 |New York|
|2 |Bob |30 |Boston |
|3 |Carol|45 |Chicago |
Column-oriented
ID: [1, 2, 3]
Name: [Alice, Bob, Carol]
Age: [25, 30, 45]
City: [New York, Boston, Chicago]
And you use column-oriented dataframes already!
… but still storing my data in a fundamentally row-oriented way.
As a CSV file
user system elapsed
14.449 0.445 15.037
As a Parquet file
user system elapsed
1.017 0.207 0.568
data <- tibble::tibble(
integers = 1:10,
doubles = as.numeric(1:10),
strings = sprintf("%02d", 1:10)
)
write.csv(data, "numeric_base.csv", row.names = FALSE)
write_csv_arrow(data, "numeric_arrow.csv")
write_parquet(data, "numeric.parquet")
df_csv <- read.csv("numeric_base.csv")
df_csv_arrow <- read_csv_arrow("numeric_arrow.csv")
df_parquet <- read_parquet("numeric.parquet")Are there any differences?
user system elapsed
0.027 0.003 0.031
user system elapsed
1.017 0.207 0.568
user system elapsed
1.017 0.207 0.568
Languages with native Parquet support:
Systems with Parquet integration:
# Read a Parquet file into R
data <- read_parquet("CA_person_2021.parquet")
# Write an R data frame to Parquet
write_parquet(data, "CA_person_2021_new.parquet")
# Reading a subset of columns
df_subset <- read_parquet(
"CA_person_2021.parquet",
col_select = c("PUMA", "COW", "AGEP")
)
# Reading with a row filter (predicate pushdown)
df_filtered <- open_dataset("CA_person_2021.parquet") |>
filter(AGEP > 40) |>
collect()library(duckdb)
con <- dbConnect(duckdb())
# Register a Parquet file as a virtual table
dbExecute(con, "CREATE VIEW pums AS SELECT *
FROM read_parquet('CA_person_2021.parquet')")
# Run our query
dbGetQuery(con, "
SELECT SUM(JWMNP * PWGTP)/SUM(PWGTP) as avg_commute_time,
COUNT(*) as count
FROM pums
WHERE AGEP >= 16
")
dbDisconnect(con, shutdown = TRUE)Arrow Table
Arrow Dataset
pums_ds <- open_dataset("data/person")
# Examine the dataset, list files
print(pums_ds)
head(pums_ds$files)
# Query execution with lazy evaluation
pums_ds |>
filter(AGEP >= 16) |>
group_by(year, ST) |>
summarize(
mean_commute_time = sum(JWMNP * PWGTP, na.rm = TRUE) /
sum(PWGTP),
count = n()
) |>
collect()collect() is calledarrow can work with data and datasets in cloud storage. This can be a good option if you don’t have access to a formal DBMS.
Hive Partitioning
Directory format: column=value
Example:
person/
├── year=2018/
│ ├── state=NY/
│ │ └── data.parquet
│ └── state=CA/
│ └── data.parquet
├── year=2019/
│ ├── ...Self-describing structure
Standard in big data ecosystem
Non-Hive Partitioning
Directory format: value
Example:
person/
├── 2018/
│ ├── NY/
│ │ └── data.parquet
│ └── CA/
│ └── data.parquet
├── 2019/
│ ├── ...Requires column naming
Less verbose directory names
# Calculate average income by state for 2021
result1 <- pums_ds |>
filter(year == 2021) |>
filter(PINCP > 0) |> # Positive income only
group_by(location) |>
summarize(
avg_income = mean(PINCP, na.rm = TRUE),
median_income = quantile(PINCP, 0.5, na.rm = TRUE),
n = n()
) |>
arrange(desc(avg_income)) |>
collect()
# View results
head(result1)library(arrow)
library(duckdb)
library(DBI)
# Open the PUMS dataset with Arrow
pums_ds <- open_dataset("data/person")
# Filter to just 2021 data for Washington
wa_2021 <- pums_ds |>
filter(year == 2021, location == "wa") |>
collect()
# Create DuckDB connection
con <- dbConnect(duckdb())
# Register Arrow Table with DuckDB
duckdb::duckdb_register_arrow(con, "pums_wa", wa_2021)
# Run SQL query
result <- dbGetQuery(con, "
SELECT
CASE
WHEN AGEP < 18 THEN 'Under 18'
WHEN AGEP < 30 THEN '18-29'
WHEN AGEP < 45 THEN '30-44'
WHEN AGEP < 65 THEN '45-64'
ELSE '65+'
END AS age_group,
AVG(JWMNP) AS avg_commute_time,
COUNT(*) AS n
FROM pums_wa
WHERE JWMNP > 0
GROUP BY age_group
ORDER BY age_group
")
# Disconnect when done
dbDisconnect(con, shutdown = TRUE)Resources:
Questions?